[SPARK-56253][PYTHON][CONNECT] Make spark.read.json accept DataFrame input#55097
[SPARK-56253][PYTHON][CONNECT] Make spark.read.json accept DataFrame input#55097Yicong-Huang wants to merge 16 commits intoapache:masterfrom
Conversation
5b1c36c to
b663689
Compare
| def ds: Dataset[String] = { | ||
| val input = transformRelation(rel.getInput) | ||
| val inputSchema = Dataset.ofRows(session, input).schema | ||
| require(inputSchema.fields.length == 1, |
There was a problem hiding this comment.
Maybe we should throw InvalidInputErrors as others
There was a problem hiding this comment.
As we don't want to validate on the python side, all errors will be thrown on the scala side. so in classic we will have UNSUPPORTED_DESERIALIZER errors. for parity tests, it might be better to keep connect error also as UNSUPPORTED_DESERIALIZER, instead of InvalidInputErrors?
There was a problem hiding this comment.
I have used InvalidInputErrors in the end
|
|
||
| def test_json_with_dataframe_input_non_string_column(self): | ||
| int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT") | ||
| with self.assertRaises(Exception): |
There was a problem hiding this comment.
Consider using assertRaisesRegex(Exception, "exactly one column|StringType") to at least verify the error message content
There was a problem hiding this comment.
Thanks! I added more regex to make the check tighter
| val input = transformRelation(rel.getInput) | ||
| val inputSchema = Dataset.ofRows(session, input).schema | ||
| if (inputSchema.fields.length != 1) { | ||
| throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length) | ||
| } | ||
| if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) { | ||
| throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType) | ||
| } | ||
| Dataset(session, input)(Encoders.STRING) | ||
| } |
There was a problem hiding this comment.
I added the checks here because otherwise INT can be implicitly cast to STRING.
a768b6a to
250ddd3
Compare
| reader | ||
| } | ||
| def ds: Dataset[String] = Dataset(session, transformRelation(rel.getInput))(Encoders.STRING) | ||
| def ds: Dataset[String] = { |
There was a problem hiding this comment.
Let's try to avoid creating the dataset twice. Analysis can be somewhat expensive. Do this instead:
val input = transformRelation(rel.getInput)
val df = Dataset.ofRows(session, input)
val inputSchema = df.schema
if (inputSchema.fields.length != 1) {
throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length)
}
if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) {
throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType)
}
df.as(Encoders.STRING)| def ds: Dataset[String] = { | ||
| val input = transformRelation(rel.getInput) | ||
| val inputSchema = Dataset.ofRows(session, input).schema | ||
| if (inputSchema.fields.length != 1) { |
There was a problem hiding this comment.
I am a bit on the fence about this one. It is fine to have multiple columns, as long as the first one is a string.
There was a problem hiding this comment.
hmm, just for clarification, what would be the behavior of multiple columns? should we just take the first column and ignore the rest?
| throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length) | ||
| } | ||
| if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) { | ||
| throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType) |
There was a problem hiding this comment.
This is technically a behavior change.
c086491 to
cd205cc
Compare
…Utils to avoid type erasure conflict
| if (fields.head.dataType != org.apache.spark.sql.types.StringType) { | ||
| throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType) | ||
| } | ||
| df.select(df.columns.head).as(Encoders.STRING) |
There was a problem hiding this comment.
You don't really have to add a projection here. df.as(Encoders.STRING) should work as well.
There was a problem hiding this comment.
I tried removing the projection, but df.as(Encoders.STRING) on a multi-column DataFrame throws UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH because the STRING encoder expects exactly one column. So the projection is needed to support multi-column DataFrames (using the first column). I'll keep it as-is.
There was a problem hiding this comment.
the projection is needed to support multi-column DataFrames (using the first column)
I think it should fail in this case?
There was a problem hiding this comment.
I think @hvanhovell wants to support multiple column case, #55097 (comment) but I am a bit not sure about how we should support multi column input.
Currently it silently drops columns after the first one when receiving more than one columns. I could also change it to raise an exception. Or, do we want to somehow join the remaining columns back after we parse the json from the first column?
@hvanhovell @zhengruifeng what do you think?
There was a problem hiding this comment.
I personally feel it should just fail, but if we want to support multiple columns by accept the first column, I think we need to document such behavior.
also cc @cloud-fan and @HyukjinKwon WDYT?
There was a problem hiding this comment.
silently dropping things is an anti pattern, let's fail explicitly.
There was a problem hiding this comment.
Ok let me change it to fail the case. Then we will not be able to support multi column.
hvanhovell
left a comment
There was a problem hiding this comment.
LGTM - one small nit.
|
Merged to master. |
| ) | ||
| result = self.spark.read.json(multi_df) | ||
| expected = [Row(name="Alice"), Row(name="Bob")] | ||
| self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) |
There was a problem hiding this comment.
I think in this case it should fail?
…me input in spark.read.json ### What changes were proposed in this pull request? Follow-up to #55097. Reject multi-column DataFrame input in `spark.read.json()` explicitly instead of silently using the first column and dropping the rest. Also renames error conditions and methods from `PARSE_INPUT_*` to `DATAFRAME_INPUT_*` since these are query compilation errors, not parse errors. ### Why are the changes needed? Per review feedback on #55097 from cloud-fan and zhengruifeng: silently dropping columns is an anti-pattern. Multi-column DataFrame input should fail explicitly. ### Does this PR introduce _any_ user-facing change? Yes. `spark.read.json(df)` now raises `DATAFRAME_INPUT_NOT_SINGLE_COLUMN` when the input DataFrame has more than one column (previously it silently used only the first column). Zero-column input now also raises `DATAFRAME_INPUT_NOT_SINGLE_COLUMN` instead of `PARSE_INPUT_NOT_STRING_TYPE`. ### How was this patch tested? Updated existing tests in `test_datasources.py` (classic) and `test_connect_readwriter.py` (Connect) to verify that multi-column and zero-column input raises the expected error. ### Was this patch authored or co-authored using generative AI tooling? No Closes #55301 from Yicong-Huang/SPARK-56253-reject-multicol. Authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…nput ### What changes were proposed in this pull request? This PR adds support for passing a `DataFrame` containing CSV strings directly to `spark.read.csv()`, following the same pattern established by #55097 (SPARK-56253) for `spark.read.json()`. ### Why are the changes needed? Adding DataFrame support to `csv()` makes the API consistent with `json()` and enables Connect-compatible CSV parsing without `sc.parallelize()`. ### Does this PR introduce _any_ user-facing change? Yes. `spark.read.csv()` now accepts a `DataFrame` with a single string column as input, in addition to the existing `str`, `list`, and `RDD` inputs. ```python csv_df = spark.createDataFrame([("Alice,25",), ("Bob,30",)], schema="value STRING") spark.read.csv(csv_df, schema="name STRING, age INT").show() # +-----+---+ # | name|age| # +-----+---+ # |Alice| 25| # | Bob| 30| # +-----+---+ ``` ### How was this patch tested? Added 10 new test cases. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #55274 from Yicong-Huang/SPARK-56255. Authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Allow
spark.read.json()to accept a DataFrame as input, in addition to file paths and RDDs. The first column of the input DataFrame must be of StringType; additional columns are ignored.Why are the changes needed?
Parsing in-memory JSON text into a structured DataFrame currently requires
sc.parallelize(), which is unavailable on Spark Connect. Accepting a DataFrame as input provides a Connect-compatible alternative. This is the inverse ofDataFrame.toJSON().Part of SPARK-55227.
Does this PR introduce any user-facing change?
Yes.
spark.read.json()now accepts a DataFrame as input. The first column must be StringType; additional columns are ignored.How was this patch tested?
New tests in
test_datasources.py(classic) andtest_connect_readwriter.py(Connect).Was this patch authored or co-authored using generative AI tooling?
No